Airflow で GCS のデータを BigQuery にロードしてみた
こんにちは、みかみです。
なんとなく Airflow をさわれるようになったものの、まだあまりバッチ処理の実行などの実処理部分を動かしてません。
Airflow で GCP 関連の処理実行してみたいなー。
やりたいこと
- Airflow の GCP 関連オペレーターにどんなものがあるのか知りたい
- Airflow で BigQuery にデータをロードしたい
Airflow の GCP 関連オペレーター
Airflow のJob( DAG )の実処理は Operator に記述されており、BashOperator や PythonOperator など、デフォルトでも様々なオペレーターが用意されていますが、GCP 関連のオペレーターもあるようです。
- Google Cloud Operators | Apache Airflow Documentation
- airflow.contrib.operators | Apache Airflow Documentation
GCE や GCS、Cloud SQL に BigQuery など、GCP 関連処理には十分対応できそうです。
組み合わせれば、BigQuery へのデータロードも問題なくできそうな予感。
サンプル DAG で、GCS から BigQuery にデータをロードしてみる
サンプルで準備されている DAG を確認していると、example_gcs_to_bq_operator.py を発見しました。
これはそのものズバリ、GCS から BigQuery にデータロードしてくれる DAG では? と思ったので、とりあえず動かしてみたい!
コード確認すると、以下の処理を行っているようです。
- BigQuery のデータセットを作成
- 1 で作成したデータセットに、GCS に配置済みのファイルデータをロード
- 2 でデータロードしたデータセットを削除
実際の動きを、Amaxon Linux2 にインストール済みの Airflow v1.10.9 環境で確認しました。
なお、サーバーから bq コマンドの実行ができて、BigQuery API を使うためのアカウントキーファイルがサーバー上に配置してあることを前提としております。
BigQuery の接続情報設定
Airflow 管理画面の「Admin」タブから、BigQuery 接続情報を設定します。
「Connections」画面から「bigquery_default」接続設定の編集アイコンをクリックします。
「Project Id」、「Keyfile Path」、「copes (comma separated)」の項目を入力し、「Save」ボタンをクリック。
なお、各項目の設定値は、以下のページを参考にしました。
GCS にロード用データを配置
GCS に test-cm-mikami バケット を作成し、その下に airflow ディレクトリを作成して、データファイルを配置しました。
ロードデータは、BigQuery のスタートガイドにリンクのあった、一般公開データセットの アメリカの赤ちゃんの名前データです。
サンプル DAG を配置して実行
以下のサンプル DAG を動かしてみます。
/airflow/contrib/example_dags/example_gcs_to_bq_operator.py
バケット名やソースデータファイルの配置場所、スキーマなどを編集し、さらにロード後のデータを確認するためデータセット削除処理はコメントアウトして、airflow.cfg の「dags_folder」で指定されているディレクトリに配置しました。
(省略) if gcs_to_bq is not None: args = { 'owner': 'Airflow', 'start_date': airflow.utils.dates.days_ago(2) } dag = models.DAG( dag_id='example_gcs_to_bq_operator', default_args=args, schedule_interval=None) create_test_dataset = bash_operator.BashOperator( task_id='create_airflow_test_dataset', bash_command='bq mk airflow_test', dag=dag) # [START howto_operator_gcs_to_bq] load_csv = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( task_id='gcs_to_bq_example', # mod mikami # bucket='cloud-samples-data', bucket='test-cm-mikami', # source_objects=['bigquery/us-states/us-states.csv'], source_objects=['airflow/yob2000.txt'], # destination_project_dataset_table='airflow_test.gcs_to_bq_table', destination_project_dataset_table='airflow_test.name_2000', # schema_fields=[ # {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'}, # {'name': 'post_abbr', 'type': 'STRING', 'mode': 'NULLABLE'}, # ], schema_fields=[ {'name': 'name', 'type': 'STRING', 'mode': 'REQUIRED'}, {'name': 'gender', 'type': 'STRING', 'mode': 'REQUIRED'}, {'name': 'count', 'type': 'INTEGER', 'mode': 'REQUIRED'}, ], write_disposition='WRITE_TRUNCATE', dag=dag) # [END howto_operator_gcs_to_bq] # del mikami # delete_test_dataset = bash_operator.BashOperator( # task_id='delete_airflow_test_dataset', # bash_command='bq rm -rf airflow_test', # dag=dag) # create_test_dataset >> load_csv >> delete_test_dataset create_test_dataset >> load_csv
管理画面から実行します。
実行完了後、GCP の管理画面から BigQuery を確認してみると・・・
無事、新しいデータセットが作成されて、GCS に配置したデータがロードされました。
つまずいたところ
サンプルで準備されてるほどだし、たぶん簡単に挙動確認できるでしょ? と、甘く見てましたが、サンプル DAG の正常終了までに、実は何度もつまずきました。。
サンプル DAG が表示されない
動作確認に使用した Amaxon Linux2 では、Airflow インストール前に bq コマンドや BigQuery API の動作確認してました。
google-cloud-sdk のインストールや API キーの認証設定も実施済みだったので、lib のインストールなども不要かと思い、初め何も考えずに サンプル DAG ファイルを配置したところ、Airflow 管理画面に example_gcs_to_bq_operator が表示されません。。
DAG のディレクトリがおかしい? Airflow Server 再起動しないとダメ? など考えましたが一向に表示されず。。
デバッグ目的で DAG ファイルを、Exception を Throw するように変更してみたところ・・・
(省略) gcs_to_bq = None # type: Any try: from airflow.contrib.operators import gcs_to_bq except ImportError: ## pass raise ImportError (省略)
まさに、ImportError になってました。。
[2020-03-13 04:18:54,290] {dagbag.py:246} ERROR - Failed to import: /home/ec2-user/airflow/dags/example_gcs_to_bq_operator.py Traceback (most recent call last): File "/home/ec2-user/airflow/dags/example_gcs_to_bq_operator.py", line 28, in <module> from airflow.contrib.operators import gcs_to_bq File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/operators/gcs_to_bq.py", line 22, in <module> from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/hooks/gcs_hook.py", line 25, in <module> from google.cloud import storage ModuleNotFoundError: No module named 'google' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/models/dagbag.py", line 243, in process_file m = imp.load_source(mod_name, filepath) File "/home/ec2-user/test_airflow/lib64/python3.7/imp.py", line 171, in load_source module = _load(spec) File "<frozen importlib._bootstrap>", line 696, in _load File "<frozen importlib._bootstrap>", line 677, in _load_unlocked File "<frozen importlib._bootstrap_external>", line 728, in exec_module File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed File "/home/ec2-user/airflow/dags/example_gcs_to_bq_operator.py", line 31, in <module> raise ImportError ImportError
ライブラリ足りなかったかなー。。 で、ちょっとちゃんとドキュメント読みました。。。
pip install 'apache-airflow[gcp]' 実行したら、ようやく Import エラー解消されて、DAG が表示されるようになりました。
ImportError: cannot import name opentype
google ライブラリは Import できるようになったものの、また出た。。
[2020-03-13 08:02:25,478] {bash_operator.py:115} INFO - Running command: bq mk airflow_test [2020-03-13 08:02:25,482] {bash_operator.py:122} INFO - Output: [2020-03-13 08:02:25,772] {bash_operator.py:126} INFO - Traceback (most recent call last): [2020-03-13 08:02:25,773] {bash_operator.py:126} INFO - File "/home/ec2-user/google-cloud-sdk/platform/bq/bq.py", line 57, in <module> [2020-03-13 08:02:25,773] {bash_operator.py:126} INFO - import oauth2client_4_0.service_account [2020-03-13 08:02:25,773] {bash_operator.py:126} INFO - File "/home/ec2-user/google-cloud-sdk/platform/bq/third_party/oauth2client_4_0/service_account.py", line 27, in <module> [2020-03-13 08:02:25,773] {bash_operator.py:126} INFO - from oauth2client_4_0 import crypt [2020-03-13 08:02:25,773] {bash_operator.py:126} INFO - File "/home/ec2-user/google-cloud-sdk/platform/bq/third_party/oauth2client_4_0/crypt.py", line 24, in <module> [2020-03-13 08:02:25,773] {bash_operator.py:126} INFO - from oauth2client_4_0 import _pure_python_crypt [2020-03-13 08:02:25,773] {bash_operator.py:126} INFO - File "/home/ec2-user/google-cloud-sdk/platform/bq/third_party/oauth2client_4_0/_pure_python_crypt.py", line 25, in <module> [2020-03-13 08:02:25,773] {bash_operator.py:126} INFO - from pyasn1_modules.rfc2459 import Certificate [2020-03-13 08:02:25,773] {bash_operator.py:126} INFO - File "/home/ec2-user/google-cloud-sdk/platform/bq/third_party/pyasn1_modules/rfc2459.py", line 21, in <module> [2020-03-13 08:02:25,773] {bash_operator.py:126} INFO - from pyasn1.type import opentype [2020-03-13 08:02:25,773] {bash_operator.py:126} INFO - ImportError: cannot import name opentype [2020-03-13 08:02:25,820] {bash_operator.py:130} INFO - Command exited with return code 1 [2020-03-13 08:02:25,830] {taskinstance.py:1128} ERROR - Bash command failed Traceback (most recent call last): File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task result = task_copy.execute(context=context) File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 134, in execute raise AirflowException("Bash command failed") airflow.exceptions.AirflowException: Bash command failed
google 先生に聞いてみたら、pyasn1 のバージョン問題の可能性が高い? と思って lib をアップデートしようとしたけど、依存関係があるため上がらず。。
(test_airflow) [ec2-user@ip-10-0-43-239 ~]$ pip install -U pyasn1 pyasn1-modules Requirement already up-to-date: pyasn1 in ./test_airflow/lib/python3.7/site-packages (0.4.8) Requirement already up-to-date: pyasn1-modules in ./test_airflow/lib/python3.7/site-packages (0.2.8) (test_airflow) [ec2-user@ip-10-0-43-239 ~]$ pip list | grep pyasn1 pyasn1 0.4.8 pyasn1-modules 0.2.8
pyasn1.type の opentype がいないと怒られてるけど、実ソース確認したら、ちゃんといるんだけどなぁ。。。
(test_airflow) [ec2-user@ip-10-0-43-239 ~]$ pip show pyasn1 Name: pyasn1 Version: 0.4.8 Summary: ASN.1 types and codecs Home-page: https://github.com/etingof/pyasn1 Author: Ilya Etingof Author-email: etingof@gmail.com License: BSD Location: /home/ec2-user/test_airflow/lib/python3.7/site-packages Requires: Required-by: rsa, pyasn1-modules (test_airflow) [ec2-user@ip-10-0-43-239 type]$ pwd /home/ec2-user/test_airflow/lib/python3.7/site-packages/pyasn1/type (test_airflow) [ec2-user@ip-10-0-43-239 type]$ ls -l total 232 -rw-rw-r-- 1 ec2-user ec2-user 22386 Mar 13 04:28 base.py -rw-rw-r-- 1 ec2-user ec2-user 11397 Mar 13 04:28 char.py -rw-rw-r-- 1 ec2-user ec2-user 22132 Mar 13 04:28 constraint.py -rw-rw-r-- 1 ec2-user ec2-user 246 Mar 13 04:28 error.py -rw-rw-r-- 1 ec2-user ec2-user 59 Mar 13 04:28 __init__.py -rw-rw-r-- 1 ec2-user ec2-user 16368 Mar 13 04:28 namedtype.py -rw-rw-r-- 1 ec2-user ec2-user 4886 Mar 13 04:28 namedval.py -rw-rw-r-- 1 ec2-user ec2-user 2848 Mar 13 04:28 opentype.py drwxrwxr-x 2 ec2-user ec2-user 4096 Mar 13 04:28 __pycache__ -rw-rw-r-- 1 ec2-user ec2-user 2998 Mar 13 04:28 tagmap.py -rw-rw-r-- 1 ec2-user ec2-user 9486 Mar 13 04:28 tag.py -rw-rw-r-- 1 ec2-user ec2-user 108921 Mar 13 04:28 univ.py -rw-rw-r-- 1 ec2-user ec2-user 5368 Mar 13 04:28 useful.py
どっか別のところ見てる?( venv 環境変えたりしてるのが影響してるのかなぁ・・・?
結局、google-cloud-sdk をインストールしなおして解決しました。
ERROR: (bq) You do not currently have an active account selected.
サンプル DAG では、まず bq コマンドで BigQuery のデータセットを作成します。
が、コマンド実行でアカウント設定エラー。。
[2020-03-13 09:19:52,097] {bash_operator.py:115} INFO - Running command: bq mk airflow_test [2020-03-13 09:19:52,101] {bash_operator.py:122} INFO - Output: [2020-03-13 09:19:52,699] {bash_operator.py:126} INFO - ERROR: (bq) You do not currently have an active account selected. [2020-03-13 09:19:52,699] {bash_operator.py:126} INFO - Please run: [2020-03-13 09:19:52,699] {bash_operator.py:126} INFO - [2020-03-13 09:19:52,699] {bash_operator.py:126} INFO - $ gcloud auth login [2020-03-13 09:19:52,699] {bash_operator.py:126} INFO - [2020-03-13 09:19:52,699] {bash_operator.py:126} INFO - to obtain new credentials, or if you have already logged in with a [2020-03-13 09:19:52,700] {bash_operator.py:126} INFO - different account: [2020-03-13 09:19:52,700] {bash_operator.py:126} INFO - [2020-03-13 09:19:52,700] {bash_operator.py:126} INFO - $ gcloud config set account ACCOUNT [2020-03-13 09:19:52,700] {bash_operator.py:126} INFO - [2020-03-13 09:19:52,700] {bash_operator.py:126} INFO - to select an already authenticated account to use. [2020-03-13 09:19:52,797] {bash_operator.py:130} INFO - Command exited with return code 1 [2020-03-13 09:19:52,810] {taskinstance.py:1128} ERROR - Bash command failed Traceback (most recent call last): File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task result = task_copy.execute(context=context) File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 134, in execute raise AirflowException("Bash command failed") airflow.exceptions.AirflowException: Bash command failed
コマンドラインから直接 bq コマンド実行しても、同じエラー発生。 google-cloud-sdk インストールし直したので、そりゃそうですよね。。
認証情報設定のため、gcloud init コマンド実行して、解決。
OSError: [Errno 12] Cannot allocate memory
なんですと? メモリが足りないですと? そんな高スペックが必要なのかしら?(メモリ関連のエラーって、けっこうドキッとしてしまいます。。。
[2020-03-13 09:40:30,323] {bash_operator.py:115} INFO - Running command: bq mk airflow_test [2020-03-13 09:40:30,327] {bash_operator.py:122} INFO - Output: [2020-03-13 09:40:31,269] {bash_operator.py:126} INFO - Traceback (most recent call last): [2020-03-13 09:40:31,271] {bash_operator.py:126} INFO - File "/home/ec2-user/google-cloud-sdk/bin/bootstrapping/bq.py", line 83, in <module> [2020-03-13 09:40:31,271] {bash_operator.py:126} INFO - exceptions.HandleError(e, 'bq') [2020-03-13 09:40:31,271] {bash_operator.py:126} INFO - File "/home/ec2-user/google-cloud-sdk/lib/googlecloudsdk/calliope/exceptions.py", line 526, in HandleError [2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - core_exceptions.reraise(exc) [2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - File "/home/ec2-user/google-cloud-sdk/lib/googlecloudsdk/core/exceptions.py", line 111, in reraise [2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - six.reraise(type(exc_value), exc_value, tb) [2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - File "/home/ec2-user/google-cloud-sdk/bin/bootstrapping/bq.py", line 81, in <module> [2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - main() [2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - File "/home/ec2-user/google-cloud-sdk/bin/bootstrapping/bq.py", line 76, in main [2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - 'platform/bq', 'bq.py', *args) [2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - File "/home/ec2-user/google-cloud-sdk/bin/bootstrapping/bootstrapping.py", line 44, in ExecutePythonTool [2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - execution_utils.ArgsForPythonTool(_FullPath(tool_dir, exec_name), *args)) [2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - File "/home/ec2-user/google-cloud-sdk/bin/bootstrapping/bootstrapping.py", line 105, in _ExecuteTool [2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - execution_utils.Exec(args + sys.argv[1:], env=_GetToolEnv()) [2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - File "/home/ec2-user/google-cloud-sdk/lib/googlecloudsdk/core/execution_utils.py", line 300, in Exec [2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - p = subprocess.Popen(args, env=env, **extra_popen_kwargs) [2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - File "/usr/lib64/python2.7/subprocess.py", line 394, in __init__ [2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - errread, errwrite) [2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - File "/usr/lib64/python2.7/subprocess.py", line 938, in _execute_child [2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - self.pid = os.fork() [2020-03-13 09:40:31,272] {bash_operator.py:126} INFO - OSError: [Errno 12] Cannot allocate memory [2020-03-13 09:40:31,323] {bash_operator.py:130} INFO - Command exited with return code 1 [2020-03-13 09:40:31,339] {taskinstance.py:1128} ERROR - Bash command failed Traceback (most recent call last): File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task result = task_copy.execute(context=context) File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 134, in execute raise AirflowException("Bash command failed") airflow.exceptions.AirflowException: Bash command failed
こちらも bq コマンド実行時のエラーだったので、試しにまたコンソールから直接 bq コマンド実行してみると、今度はすんなり通った。。(なら、Airflow からでも行けるんじゃない・・・?
で、実行中のプロセス確認してみると・・・
(test_airflow) [ec2-user@ip-10-0-43-239 ~]$ ps aux | grep airflow ec2-user 4372 0.1 7.0 415996 70888 ? S 04:03 0:24 airflow scheduler -- DagFileProcessorManager ec2-user 11597 0.1 7.9 455620 80092 ? S 07:27 0:14 airflow scheduler -- DagFileProcessorManager ec2-user 12521 0.1 8.7 449744 88392 pts/2 S+ 07:55 0:10 /home/ec2-user/test_airflow/bin/python3 /home/ec2-user/test_airflow/bin/airflo webserver -p 8080 ec2-user 12524 0.0 6.8 421624 68988 pts/2 S+ 07:55 0:02 gunicorn: master [airflow-webserver] ec2-user 15880 0.0 7.9 455636 80140 ? S 09:19 0:02 airflow scheduler -- DagFileProcessorManager ec2-user 17714 0.3 9.0 449976 91220 pts/2 S+ 10:08 0:00 [ready] gunicorn: worker [airflow-webserver] ec2-user 17730 0.5 9.0 449976 91224 pts/2 S+ 10:08 0:00 [ready] gunicorn: worker [airflow-webserver] ec2-user 17748 0.8 9.0 449976 91220 pts/2 S+ 10:09 0:00 [ready] gunicorn: worker [airflow-webserver] ec2-user 17766 2.6 9.0 449980 91216 pts/2 S+ 10:09 0:00 [ready] gunicorn: worker [airflow-webserver] ec2-user 17778 0.0 0.0 119416 964 pts/5 S+ 10:10 0:00 grep --color=auto airflow
airflow scheduler ってこんなに多かったっけ? デーモンが残っているようです。。
念のため、サーバー再起動したら、エラー出なくなりました。
ERROR - INTERNAL: No default project is specified
うん、そういえば、GCP のプロジェクトID、どこでも指定してなかった気がするw
[2020-03-13 10:14:28,680] {taskinstance.py:1128} ERROR - INTERNAL: No default project is specified Traceback (most recent call last): File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task result = task_copy.execute(context=context) File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/operators/gcs_to_bq.py", line 288, in execute encryption_configuration=self.encryption_configuration) File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 1212, in run_load var_name='destination_project_dataset_table') File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 2189, in _split_tablename raise ValueError("INTERNAL: No default project is specified") ValueError: INTERNAL: No default project is specified
どこで設定すればいいんだっけ? と調べて、Airflow 管理画面から各種接続設定ができることを知りました。
ERROR - Invalid key JSON.
まだ正常系通らない。。
[2020-03-13 11:06:56,491] {taskinstance.py:1128} ERROR - Invalid key JSON. Traceback (most recent call last): File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/hooks/gcp_api_base_hook.py", line 165, in _get_credentials keyfile_dict = json.loads(keyfile_dict) File "/usr/lib64/python3.7/json/__init__.py", line 348, in loads return _default_decoder.decode(s) File "/usr/lib64/python3.7/json/decoder.py", line 337, in decode obj, end = self.raw_decode(s, idx=_w(s, 0).end()) File "/usr/lib64/python3.7/json/decoder.py", line 355, in raw_decode raise JSONDecodeError("Expecting value", s, err.value) from None json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0) During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task result = task_copy.execute(context=context) File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/operators/gcs_to_bq.py", line 246, in execute conn = bq_hook.get_conn() File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 68, in get_conn service = self.get_service() File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 82, in get_service http_authorized = self._authorize() File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/hooks/gcp_api_base_hook.py", line 195, in _authorize credentials = self._get_credentials() File "/home/ec2-user/test_airflow/lib/python3.7/site-packages/airflow/contrib/hooks/gcp_api_base_hook.py", line 179, in _get_credentials raise AirflowException('Invalid key JSON.') airflow.exceptions.AirflowException: Invalid key JSON.
例外吐いてるソースコードは以下。
(省略) if not key_path and not keyfile_dict: self.log.info('Getting connection using `google.auth.default()` ' 'since no key file is defined for hook.') credentials, _ = google.auth.default(scopes=scopes) elif key_path: # Get credentials from a JSON file. if key_path.endswith('.json'): self.log.debug('Getting connection using JSON key file %s' % key_path) credentials = ( google.oauth2.service_account.Credentials.from_service_account_file( key_path, scopes=scopes) ) elif key_path.endswith('.p12'): raise AirflowException('Legacy P12 key file are not supported, ' 'use a JSON key file.') else: raise AirflowException('Unrecognised extension for key file.') else: # Get credentials from JSON data provided in the UI. try: print(keyfile_dict) keyfile_dict = json.loads(keyfile_dict) # Depending on how the JSON was formatted, it may contain # escaped newlines. Convert those to actual newlines. keyfile_dict['private_key'] = keyfile_dict['private_key'].replace( '\\n', '\n') credentials = ( google.oauth2.service_account.Credentials.from_service_account_info( keyfile_dict, scopes=scopes) ) except json.decoder.JSONDecodeError: raise AirflowException('Invalid key JSON.') (省略)
JSON て、BigQuery API のアクセスキーファイルのことだよね? ファイルの中身のJSON構文がおかしい? と調べてみたけど問題なし。
デバッグプリント追加して keyfile_dict の値見てみたら、アクセスキーのファイルパスになってる。。
Airflow 管理画面からの接続設定で、ファイルパス書く場所間違ってました。。
- 誤:Keyfile JSON
- 正:Keyfile Path
まとめ(所感)
Airflow も GCP もまだあまり深く理解していない状態ですが、比較的簡単に目的達成できました。
既存のオペレーターを参考にして、他にもやりたいことに合わせて処理拡張できそうです。
各サービスの接続情報を画面から簡単に設定できるUIもあるので、Airflow を使えばいろいろなサービスと連携できて、バッチ処理の幅も広がりそうです。